package org.rocketmq.example.producer;

import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.rocketmq.template.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author: zd
 * @Date: 2023-02-06 22:04
 * @Description: 生产者示例用法
 */
@RequestMapping("/producer")
@RestController
public class ProducerExample {

    @Resource
    private RocketMQTemplate template;

    /**
     * 同步发送
     * 消费者ConsumerExample1、ConsumerExample2
     */
    @GetMapping("/send")
    public String test1(@RequestParam String msg) {
        // 普通消息
        try {
            String topic = "MyTopicTest";
            // 延迟时间为1分钟之后的Unix时间戳
            Long deliverTimeStamp = System.currentTimeMillis() + (long) 60 * 1000;
            SendResult send = template.send(topic, msg, 0);
            System.out.println("消息发送成功==> " + send);
            SendResult send1 = template.send(topic, msg + "_延迟消息", deliverTimeStamp);
            // 注意：消费者订阅关系一致 <a href="https://rocketmq.apache.org/zh/docs/bestPractice/05subscribe">...</a>
            SendResult tag1 = template.send(topic, "TAG1", msg + "_TAG1", 0);
            SendResult tag2 = template.send(topic, "TAG2", msg + "_TAG2", 0);
            SendResult send2 = template.send(topic, "TAG1", "KEYS1", msg + "KEYS1",0, 0);
            // 延迟30s
            SendResult send3 = template.send(topic, "TAG2","KEYS2", msg + "_TAG2", 4, 0);
            return String.valueOf(send);
        } catch (Exception e) {
            System.err.println("消息发送失败==> " + e.getMessage());
        }
        return "success";
    }

    /**
     * 异步消息
     * 消费者ConsumerExample1、ConsumerExample2
     */
    @GetMapping("/sendAsync")
    public String test2(@RequestParam String msg) {
        // 异步消息
        try {
            String topic = "MyTopicTest";
            // 延迟时间为1分钟之后的Unix时间戳
            Long deliverTimeStamp = System.currentTimeMillis() + (long) 60 * 1000;
            template.sendAsync(topic, "TAG1",msg + "_TAG1", new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("异步消息发送成功==> " + sendResult);
                }
                @Override
                public void onException(Throwable e) {
                    System.err.println("异步消息发送失败==> " + e);
                }
            },0);
            template.sendAsync(topic, "TAG2",msg + "_TAG2", new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("异步消息发送成功==> " + sendResult);
                }
                @Override
                public void onException(Throwable e) {
                    System.err.println("异步消息发送失败==> " + e);
                }
            },0);
        } catch (Exception e) {
            System.err.println("消息发送失败==> " + e.getMessage());
        }
        return "success";
    }

    /**
     * 单向发送
     * 消费者ConsumerExample1、ConsumerExample2
     */
    @GetMapping("/sendOneway")
    public String test3(@RequestParam String msg) {
        try {
            String topic = "MyTopicTest";
            // 延迟时间为1分钟之后的Unix时间戳
            Long deliverTimeStamp = System.currentTimeMillis() + (long) 60 * 1000;
            template.sendOneway(topic, msg + "_延迟消息", deliverTimeStamp);
            template.sendOneway(topic, "TAG1", msg + "_TAG1", 0);
            template.sendOneway(topic, "TAG2", msg + "_TAG2", 0);
        } catch (Exception e) {
            System.err.println("消息发送失败==> " + e.getMessage());
        }
        return "success";
    }

    /**
     * 广播消息
     * 消费者ConsumerExample3、ConsumerExample4
     */
    @GetMapping("/send4")
    public String test4(@RequestParam String msg) {
        try {
            String topic = "MyTopicTest";
            // 广播模式测试
            for (int i = 0; i < 3; i++) {
                template.send(topic, "TAG3",  msg + "_" + i, 0);
            }
        } catch (Exception e) {
            System.err.println("消息发送失败==> " + e.getMessage());
        }
        return "success";
    }

    /**
     * 顺序消息
     * 消费者ConsumerExample5
     */
    @GetMapping("/send5")
    public String test5() {
        String topic = "MyTopicTest";
        List<Order> orderList = Order.getOrderList();
        // MessageQueueSelector 消息队列选择器，根据业务唯一标识自定义队列选择算法
        for (int i = 0; i < orderList.size(); i++) {
            try {
                String msg = "[" + orderList.get(i) + orderList.get(i).getOrderStatus() + "] 订单状态变更顺序消息测试";
                SendResult sendResult = template.send(topic, "TAG5", msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        //根据arg(实际上是订单id)选择消息发送的队列
                        long index = (long) arg % mqs.size();
                        return mqs.get((int) index);
                    }
                }, orderList.get(i).getOrderId());
                System.out.printf("消息发送状态:%s, orderId:%s, queueId:%d, msg:%s%n",
                        sendResult.getSendStatus(),
                        orderList.get(i).getOrderId(),
                        sendResult.getMessageQueue().getQueueId(),msg);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        return "success";
    }

    /**
     * 批量消息
     * 消费者ConsumerExample
     */
    @GetMapping("/send6")
    public String test6() {
        try {
            String topic = "MyTopicTest";
            List<Message> messages = new ArrayList<>();
            for (int i = 1; i <= 50000; i++) {
                String messageBody = "测试批量发送消息第" + i + "条消息";
                Message message = new Message(topic, "TAG1", messageBody.getBytes(StandardCharsets.UTF_8));
                messages.add(message);
            }

            template.sendBatch(messages);
        } catch (Exception e) {
            System.err.println("消息发送失败==> " + e.getMessage());
        }
        return "success";
    }

    /**
     * 事务消息
     */
    @GetMapping("/send7")
    public String test7() {
        try {
            String topic = "MyTopicTest";
            for (int i = 1; i <= 5; i++) {
                String messageBody = "测试事务消息" + i + "条消息";
                Message message = new Message(topic, "TAG6", messageBody.getBytes(StandardCharsets.UTF_8));
                TransactionSendResult transactionSendResult = template.sendTransactionMsg(message);
                System.out.println("消息发送状态==>" + transactionSendResult);
            }
        } catch (Exception e) {
            System.err.println("消息发送失败==> " + e.getMessage());
        }
        return "success";
    }
}
